package cn._51doit.live.udf;

import cn._51doit.live.pojo.DataBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 将JSON字符串转成Bean的Function
 * 1.数据的转换
 * 2.问题数据的过滤
 */
public class JsonToBeanFunction extends ProcessFunction<String, DataBean> {


    @Override
    public void processElement(String value, Context ctx, Collector<DataBean> out) throws Exception {

        try {
            DataBean dataBean = JSON.parseObject(value, DataBean.class);
            out.collect(dataBean);
        } catch (Exception e) {
            //e.printStackTrace();
        }


    }
}
